package com.calabar.put;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * <p/>
 * <li>Description: 数据插入Hbase实现</li>
 * <li>@author: zijian.wu</li>
 * <li>Date: 2018/7/5 10:00</li>
 */
@Component
public class SendToHbase {
    /**
     * 日志记录
     */
    private static final Logger LOGGER = LogManager.getLogger(SendToHbase.class);

    /**
     * Hbase 连接对象
     */
    private static Connection CONN;

    /**
     * Zookeeper Sever
     */
    private static String ZOOKEEPER;

    /**
     * Zookeeper Port
     */
    private static String ZOOKEEPER_PORT;

    @Value("${hbase.zookeeper.server}")
    public void setZookeeperServer(String zookeeper) {
        ZOOKEEPER = zookeeper;
    }

    @Value("${hbase.zookeeper.port}")
    public void setZookeeperPort(String port) {
        ZOOKEEPER_PORT = port;
    }

    /**
     * 获取Hbase的连接
     *
     * @return Hbase connection
     * @throws Exception the exception
     */
    public static Connection getConnection() throws Exception {
        if (null == CONN || CONN.isClosed()) {
            try {
                Configuration conf = new Configuration();
                conf.set("hbase.zookeeper.quorum", ZOOKEEPER);
                conf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT);
                conf.set("zookeeper.znode.parent", "/hbase-unsecure");
                CONN = ConnectionFactory.createConnection(conf);
            } catch (IOException e) {
                LOGGER.error("无法连接Hbase", e);
            }
        }
        return CONN;
    }

    /**
     * 关闭hbase连接
     */

    public static void closeConnection() {
        try {
            CONN.close();
        } catch (IOException e) {
            LOGGER.error("无连接", e);
        }
    }

    /**
     * 批量插入
     *
     * @param tableName      表名
     * @param columnFamilies 列族组
     * @param formattedData  规范化数据
     * @throws Exception the exception
     */
    public static void putDataList(String tableName, String[] columnFamilies,
                                   Map<String, List<Map<String, Object>>> formattedData) throws Exception {
        //获取连接
        Connection connection = getConnection();

        Table table = null;

        try {
            table = connection.getTable(TableName.valueOf(tableName));
        } catch (Exception e) {
            LOGGER.error("无法连接到表", e);
        }

        List<Put> list = new ArrayList<>();

        //遍历map获取rowkey，和本行中数据数组
        for (Map.Entry<String, List<Map<String, Object>>> entry : formattedData.entrySet()) {
            //行键
            String rowkey = entry.getKey();
            //一行中的数据
            List<Map<String, Object>> rowdata = entry.getValue();
            //PUT
            Put put = new Put(Bytes.toBytes(rowkey));

            //判断columnFamilies与本行中数据数组长度是否一致
            if (columnFamilies.length != rowdata.size()) {
                throw new Exception("列族数与数据组长度不一致");
            }

            //循环插入数据 第i组数据放入第i组columnFamilies
            for (int i = 0; i < columnFamilies.length; i++) {
                for (Map.Entry<String, Object> subEntry : rowdata.get(i).entrySet()) {
                    //列
                    String column = subEntry.getKey();
                    //单元
                    String cell = subEntry.getValue().toString();

                    put.addColumn(Bytes.toBytes(columnFamilies[i]), Bytes.toBytes(column), Bytes.toBytes(cell));

                }
            }
            list.add(put);
        }
        table.put(list);

        //关闭表
        table.close();
    }

}
